package com.freedom.monitor.myeye.client.reap;

/**
 * author:刘志强
 * QQ: 837500869 
 * Email:837500869@qq.com
 */
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.freedom.monitor.myeye.client.queue.DataQueue;
import com.freedom.monitor.myeye.client.report.Report;
import com.freedom.monitor.myeye.client.report.ReportMap;
import com.freedom.monitor.myeye.client.report.Statistics;
import com.freedom.monitor.myeye.client.utils.Logger;
import com.freedom.monitor.myeye.client.utils.MachineUtils;
import com.freedom.monitor.myeye.client.utils.PartitionUtils;
import com.freedom.monitor.myeye.client.utils.PropertyUtils;
import com.freedom.monitor.myeye.commmon.model.IOModel;
import com.freedom.monitor.myeye.commmon.model.KeyModel;
import com.freedom.monitor.myeye.commmon.model.SubKeyModel;
import com.freedom.monitor.myeye.commmon.utils.CompressUtils;
import com.freedom.monitor.myeye.commmon.utils.GsonUtils;
import com.freedom.monitor.myeye.commmon.utils.StringUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

@SuppressWarnings("unused")
public class MyReapRunnable implements Runnable {
	//
	//
	private static Logger logger = Logger.getLogger(MyReapRunnable.class);
	//
	//
	private static String COMPRESS_THRESHOLD = "compressThreshold";
	private static String SECRET_TOKEN = "token";
	private String product = null;
	private String service = null;
	private String productServiceKey = null;
	//
	private Gson gson = new GsonBuilder().create();
	private SimpleDateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
	//// 记住存在的不同subkey,自带记忆功能
	private Set<String> subKeyRemember = new HashSet<String>();// 只能存储不重复的对象

	//
	public MyReapRunnable(String p, String s) {
		product = p;
		service = s;
		productServiceKey = StringUtils.unionByMagicKey(p, s);
	}

	@SuppressWarnings("unchecked")
	private List<HashMap<String, Statistics>> collectSamples() {
		// 1)获取partition个数
		List<HashMap<String, Statistics>> sampleLists = new ArrayList<HashMap<String, Statistics>>();
		// 2)开始计算
		for (int index = 0; index < PartitionUtils.getPartition(); index++) {
			Object value = ReportMap.getAndRemove(StringUtils.unionByMagicKey(productServiceKey, "" + index));
			// 2.1)保证不为null
			if (null == value) {
				continue;
			}
			// 2.2)根据需要,归一化成HashMap<String,Statistics>()
			if (value instanceof Report) {
				HashMap<String, Statistics> map = new HashMap<String, Statistics>();
				map.put(((Report) value).getSubKey(), new Statistics(((Report) value)));
				value = map;
			} else {
				// 什么都不需要做
			}
			// 2.3)加入到List中
			sampleLists.add((HashMap<String, Statistics>) value);
		}
		return sampleLists;
	}

	private HashMap<String, Statistics> handle(List<HashMap<String, Statistics>> sampleLists, Set<String> subKeyZero,
			Statistics subKeySum) {
		// 最终汇聚成总的targetMap
		HashMap<String, Statistics> targetMap = new HashMap<String, Statistics>();
		//
		// 这里的对象都是引用,所以不用担心
		for (HashMap<String, Statistics> sampleMap : sampleLists) {
			Set<String> keys = sampleMap.keySet();
			for (String key : keys) {
				Statistics value = sampleMap.get(key);
				// 先维护subKeyRemember & subKeyZero
				subKeyRemember.add(key);// 尽可能的扩充,HashSet保证不会有重复的
				subKeyZero.remove(key);// 尽可能的删除
				// 再来维护subKeySum,进行累加
				subKeySum.addStatistics(value);
				// 最后再来维护targetMap
				Statistics oldStat = targetMap.get(key);
				if (null == oldStat) {// 出现了一个新的key
					targetMap.put(key, value);
				} else {// 累加到老的统计信息上
					oldStat.addStatistics(value);
				}
			}
		}
		// 尘埃落定,万物归宗
		// 还要加入掉0的subKey,有的业务会有掉0情况的检测，这样会及时发现问题
		for (String key : subKeyZero) {
			targetMap.put(key, new Statistics());
		}
		// 还要加上汇总数据,因为有的人想看到所有key的本次统计信息
		targetMap.put(StringUtils.GLOBAL_MAGIC_SUM, subKeySum);
		// OK,可以返回了
		// logger.debug("" + targetMap);
		return targetMap;
	}

	private List<KeyModel> build(String sampleTime, HashMap<String, Statistics> finalResult) {
		List<KeyModel> keyModelList = new ArrayList<KeyModel>();
		{
			// 1)填充大字段
			KeyModel keyModel = new KeyModel();
			keyModel.setSt(sampleTime);
			keyModel.setPn(product);
			keyModel.setSn(service);
			keyModel.setSid(MachineUtils.getServerID());
			// 2)填充小字段
			Statistics value;
			Set<String> keys = finalResult.keySet();
			for (String key : keys) {
				value = finalResult.get(key);
				keyModel.addKey(new SubKeyModel(StringUtils.format(key), value.getTotalCount(), value.getTotalSucceed(), //
						value.getTotalCostTime(), value.getMaxCostTime(), value.getMinCostTime()));
			}
			// 3)加入到数组列表里
			keyModelList.add(keyModel);
		}
		return keyModelList;
	}

	private void execute() {
		// 1)构造好所有采样的数据集合
		List<HashMap<String, Statistics>> sampleLists = collectSamples();
		// 2)先拿到采样时间
		String sampleTime = dateFormat.format(new Date());
		// 3)构造好一个调用次数为0的副本集
		Set<String> subKeyZero = new HashSet<String>(subKeyRemember);
		// 4)构造一个allKey
		Statistics subKeySum = new Statistics();
		// 5)开始遍历对象处理,并取得最终汇总结果
		HashMap<String, Statistics> finalResult = handle(sampleLists, subKeyZero, subKeySum);
		// 6)构造KeyModel
		List<KeyModel> keyModelList = build(sampleTime, finalResult);
		// 7)序列化成字符串
		String serializedString = gson.toJson(keyModelList);
		logger.debug("serializedString--->" + serializedString);
		// 8)判断是否需要压缩
		boolean compressed = false;
		int compressAlgorithm = 0;
		int compressThreshold = Integer.parseInt(PropertyUtils.getInstance().getProperty(COMPRESS_THRESHOLD, "8"));
		if (serializedString.length() > compressThreshold * 1024) {// KB
			if (null == (serializedString = CompressUtils.compress(serializedString))) {
				logger.error("compress data for [" + product + "] [" + service + "] error!!!");
				return;// 压缩失败,立刻返回
			}
			compressed = true;// 压缩成功
			compressAlgorithm = CompressUtils.KAFKA_LZ4;// 因为可能有各种各样的压缩算法，所以这里也标记一下
		} else {
			compressed = false;// 没有压缩
		}
		// 9)构造最终的入队Model,通过这种方式构造有些问题，所以直接用下面的拼接方式了
		IOModel ioModel = new IOModel();
		ioModel.setCp(compressed);
		ioModel.setCa(compressAlgorithm);
		ioModel.setCt(serializedString);
		// 10)完毕，终于结束了
		// DataQueue.insertData(productServiceKey,
		// "{"//
		// + "\"cp\":" + compressed + ","//
		// + "\"ca\":" + compressAlgorithm + ","//
		// + "\"ct\":" + serializedString //
		// + "}");
		 DataQueue.insertData(productServiceKey,gson.toJson(ioModel));
	}

	@Override
	public void run() {// 因为只要有异常，任务就彻底不执行了，所以要捕捉任何异常
		try {
			// long begin = System.currentTimeMillis();
			execute();
			// long end = System.currentTimeMillis();
			// logger.debug("reap Runnable --->" + (end - begin) + " ms");
		} catch (Throwable t) {
		}
	}

}
